Python thread
介绍
单线程表示程序中只有一个执行流, 程序任一时间只执行一个任务
多线程表示有多个执行流, 单个程序在同一时间执行多个任务
多线程本质是 CPU 多核心, 如 CPU 8 核 12 线程, 表示 8 个物理核心, 4 个核心支持超线程, 1 个核心支持双线程
其余 4 个核心对应 1 个核心, 若硬件仅有 1 个核心且无超线程, 代码层面使用多线程结果依然是单线程(线程切换)
多线程
python
语言内置 GIL 锁(互斥锁), 同一时间只有 1 个线程能执行, 实际是在多个线程间来回来回切换
GIL 锁使得多线程在执行 CPU 计算密集任务不能节省时间, 多线程执行 IO 密集任务可以节约时间
import timeit
from threading import Thread
def compute():
for _ in range(100_000_000):
pass
def single_compute():
compute()
compute()
def multi_compute():
t1, t2 = Thread(target=compute), Thread(target=compute)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"single: {timeit.timeit('single_compute()', number=1, globals=globals()) = }")
print(f"multi: {timeit.timeit('multi_compute()', number=1, globals=globals()) = }")
# 计算密集任务使用多线程结果与单线程结果相差不大, 类似抄两篇文章, 不论来回切还是按顺序都需要一直抄
single: timeit.timeit('single_compute()', number=1, globals=globals()) = 3.9793272567912936
multi: timeit.timeit('multi_compute()', number=1, globals=globals()) = 3.7624008180573583
import timeit
from threading import Thread
from time import sleep
def io():
sleep(2)
def single_io():
io()
io()
def multi_io():
t1, t2 = Thread(target=io), Thread(target=io)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"single: {timeit.timeit('single_io()', number=1, globals=globals()) = }")
print(f"multi: {timeit.timeit('multi_io()', number=1, globals=globals()) = }")
# io 密集任务近乎真实多线程(瓶颈主要在 IO, CPU 大部分时间在等待)
# 实际 io 任务受线程切换开销, 磁盘 IO 等因素, 多线程 IO 任务不一定会比单线程快
single: timeit.timeit('single_io()', number=1, globals=globals()) = 4.000157259404659
multi: timeit.timeit('multi_io()', number=1, globals=globals()) = 2.000672407448292
python 多线程更大的意义在于创建多个工作流, "同时" 执行多个任务
from threading import Thread
def test():
print("do some task")
# 创建线程
t = Thread(target=test)
# 线程后台执行
t.start()
# 等待线程执行完成
t.join()
from time import sleep, time
from datetime import datetime
from threading import Thread
from loguru import logger
class Chan:
""" 单个线程封装
Params:
func: callable 可执行函数
params: dict|None=None 函数参数
count: int=1 设定函数执行次数(小于 0 表示无限次数)
timeout: int=0 设定超时时间(小于等于 0 表示无限时间)
interval: int=0 循环执行间隔
"""
def __init__(self,
func: callable,
params: dict|None=None,
count: int=1,
timeout: int=0,
interval: int=0,
) -> None:
self.timeout: int|float = timeout if timeout > 0 else float('inf')
self.count: int|float = int(count) if count > 0 else float('inf')
self.interval: int = interval
self.func: callable = func
self.params: dict = params if params else {}
self._running: bool = True
self.output: dict[datetime, any] = {}
self.thread: Thread = Thread(target=self.function, daemon=True)
self.pid: int = self.thread.native_id
self.alive: bool = self.thread.is_alive()
def function(self) -> None:
""" 线程执行函数 """
if self._running is False:
raise RuntimeError("Thread has already been started")
index, timeout = 0, time() + self.timeout
while True:
if not self._running:
break
self.output.update({datetime.now(): self.func(**(self.params))})
sleep(self.interval)
index += 1
if (index >= self.count or time() > timeout):
self._running = False
break
self.alive = False
def run(self) -> int:
""" 执行线程 """
self.thread.start()
self.pid = self.thread.native_id
self.alive = self.thread.is_alive()
return self.pid
def wait(self, timeout=None) -> dict[datetime, any]:
""" 等待线程结束 """
self.thread.join(timeout)
self.alive = self.thread.is_alive()
return self.output
def stop(self) -> bool:
""" 主动停止线程(当次循环执行完后停止) """
self._running = False
self.alive = self.thread.is_alive()
return self.alive is False
def pool(chans: list[Chan], size: int=5, interval: float=0.1) -> list[Chan]:
""" 线程池执行池
chans list[Chan]: 线程列表
size int: 线程池最大数量
interval float: 轮询间 隔时间
"""
if not isinstance(size, int) or size <= 0:
raise ValueError("size must be int and bigger than 0")
group: list[Chan] = []
pool: list[Chan] = []
chan_list = chan_list[::-1]
while len(pool) < size and chan_list:
chan = chan_list.pop()
chan.run()
pool.append(chan)
while pool:
for index, chan in enumerate(pool):
if chan.alive:
continue
group.append(pool.pop(index))
if chan_list:
ch = chan_list.pop()
ch.run()
pool.append(ch)
sleep(interval)
return group
if __name__ == '__main__':
def wait(second: int):
logger.info(f"wait {second} second")
sleep(second)
return second
c1 = Chan(func=wait, params={'second': 3}, count=2, timeout=6, interval=1)
c2 = Chan(func=wait, params={'second': 2}, count=2, interval=1)
pool([c1, c2], 2, 0.1)
logger.info(f"{c1.output=}, {c1.alive=}, {c1.pid=}")
logger.info(f"{c2.output=}, {c2.alive=}, {c2.pid=}")
logger.info("finish")